iT邦幫忙

2025 iThome 鐵人賽

DAY 1
10
AI & Data

「知其然,更知其所以然:什麼是 Real-time (Streaming) Pipeline?從造輪子到 Flink 與 RisingWave」系列 第 1

【知其然,更知其所以然】Day 1: Data Pipeline 為什麼需要 Streaming?用 Batch 不行嗎?

  • 分享至 

  • xImage
  •  

https://ithelp.ithome.com.tw/upload/images/20250821/20124758l5ouBXh3kd.png
AI 時代到來,數據驅動不再只是「資料多」,而是要「即時可用」。推薦系統、fraud detection、AI decision engine,這些需求都希望資料進來就能立刻用。Stream Processing(串流處理) 因此從選擇題,變成現代數據平台的必備核心能力。

相比傳統的 Batch Processing(批次處理),Stream Processing 能夠:

  • 消除延遲:不用等資料累積到一定量才處理,資料一來就處理
  • 提供即時決策能力:讓系統能在秒級內做出商業決策

什麼是 Streaming Pipeline?

Streaming Pipeline(串流數據管道)是實現 Stream Processing 的完整架構體系。它包含了從數據接收、處理、到輸出的整個流程,讓我們能夠建構出穩定可靠的即時數據處理系統。

Stream Processing 的核心特色:

  • 即時處理:數據一進入系統就立即處理,延遲通常在秒級或毫秒級
  • 連續性:持續不斷地處理數據流,而非等待批次累積
  • 事件驅動:基於事件觸發處理邏輯,適合實時決策場景
  • 狀態管理:能維護處理過程中的狀態,支援複雜的聚合運算

Streaming Pipeline 技術架構

┌─────────────┐    ┌──────────────┐
│ Data Sources│───▶│ Message Queue│
│             │    │              │
│• App Logs   │    │• Kafka       │
│• IoT Events │    │• Pulsar      │
│• User Acts  │    │• RabbitMQ    │
│• DB Changes │    │              │
└─────────────┘    └──────────────┘
                    ┌──────────────────┐    ┌────────────┐
───────────────────▶│Stream Processing │───▶│Output Sinks│
                    │Engine            │    │            │
                    │• Flink           │    │• Database  │
                    │• RisingWave      │    │• Dashboard │
                    │• Spark Streaming │    │• Warehouse │
                    │                  │    │• Alerts    │
                    └──────┬───────────┘    └────────────┘
                           ▼
                    ┌────────────┐
                    │State Store │
                    │            │
                    │• RocksDB   │
                    │• Memory    │
                    │• Checkpoint│
                    └────────────┘

核心組件說明:

  1. 數據源(Data Sources):應用日誌、IoT 感測器、用戶行為事件、資料庫變更等
  2. 消息佇列(Message Queue):Kafka、Pulsar 等,提供可靠的數據緩衝與分發
  3. 流處理引擎(Stream Processing Engine):這是整個架構的核心,負責執行實際的 Stream Processing 邏輯,如 Flink、RisingWave、Spark Streaming 等
  4. 狀態存儲(State Store):Stream Processing 需要維護狀態來支援 windowing、aggregation 等操作
  5. 輸出端(Output Sinks):處理結果的目的地,如資料庫、數據倉儲、即時儀表板、告警系統等

Stream Processing Engine 演進之路

這一系列三十天文章,筆者會專注於 Stream Processing Engine 這個核心組件,用自己實際的經驗,深入探討不同 Stream Processing Engine 的特色與演進。經歷了從 手寫 Consumer → Flink → RisingWave 的技術轉變,會詳細分享每個 Stream Processing Engine 的實作心得、優缺點分析,以及選型考量。

起點:手寫 consumer(Python)

當時團隊最初的「即時報表」解法,就是手寫 Kafka consumer。嚴格來說,這還不算是真正的 Stream Processing Engine,只是最基礎的消息消費邏輯。

consumer = KafkaConsumer(
    'orders',
    bootstrap_servers='localhost:9092',
)
order_count = {}

for msg in consumer:
    order = json.loads(msg.value)
    merchant = order['merchant_id']
    order_count[merchant] = order_count.get(merchant, 0) + 1

這種方式雖然簡單,但 Kafka offset、錯誤處理、狀態維護全要自己處理,麻煩又易壞。缺乏 Stream Processing Engine 應有的 fault tolerance、state management、windowing 等核心功能

下一步:Flink(PyFlink + Flink SQL)

導入 Flink 這個成熟的 Stream Processing Engine 後,事情變得專業多了。
Flink 作為業界領先的 Stream Processing Engine,幫我們處理了 Kafka offset、watermark、checkpoint 等核心功能,讓大規模 Stream Processing 變得穩定可靠。

from pyflink.table import EnvironmentSettings, TableEnvironment

t_env = TableEnvironment.create(
    EnvironmentSettings.in_streaming_mode()
)

t_env.execute_sql("""
  CREATE TABLE orders (
    merchant_id STRING
  ) WITH (
    'connector' = 'kafka',
    'topic' = 'orders',
    'properties.bootstrap.servers' = 'localhost:9092',
    'format' = 'json'
  )
""")

t_env.execute_sql("""
  CREATE TABLE order_summary 
  WITH ('connector'='print') AS
  SELECT merchant_id, COUNT(*) 
  FROM orders GROUP BY merchant_id
""")

不過 Flink 這個 Stream Processing Engine 仍需要較高的工程門檻,例如 performance tuning、cluster 部署、state backend 調整等。

最新一站:RisingWave(Kafka source + Materialized View)

最近使用 RisingWave 這個新一代 Stream Processing Engine 時,發現 Stream Processing 真的「簡化很多」。RisingWave 同樣是完整的 Stream Processing Engine,但提供了更簡潔的使用方式,只需要用 SQL 就能完成複雜的流處理邏輯。

CREATE SOURCE orders (
  merchant_id VARCHAR
) WITH (
  connector = 'kafka',
  topic = 'orders',
  properties.bootstrap.servers = 'localhost:9092',
  format = 'json'
);

CREATE MATERIALIZED VIEW order_summary AS
SELECT merchant_id, COUNT(*) AS order_count
FROM orders
GROUP BY merchant_id;

相比 Flink,RisingWave 這個 Stream Processing Engine 的優勢很明顯:

  • SQL 即可搞定複雜的 Stream Processing 邏輯
  • 自動管理 checkpoint、state backend 等 Stream Processing Engine 的核心功能
  • 用戶只需要 focus 在資料與邏輯,Stream Processing Engine 的 infra 細節幾乎都 abstract away
  • Cloud-native 架構,compute/storage 可分離、彈性擴展

對熟悉 SQL 的 data team 來說,這個現代化的 Stream Processing Engine 門檻低非常多。

總結

從手寫 Consumer → Flink → RisingWave,這段旅程正好反映了 Stream Processing Engine 的演進趨勢:

  • 越來越穩定:從手動管理狀態到自動化的 fault tolerance
  • 越來越易用:從複雜的程式碼到 SQL-first 的開發體驗
  • 越來越平民化:越來越接近「資料團隊人人都能用」的目標

在 AI 時代,Stream Processing Engine 會變成數據平台的核心,而選型標準也正在往「更低門檻、更快上手」方向演進。

接下來會深入探討各種 Stream Processing Engine 的技術細節、選型考量、最佳實務,以及未來趨勢,敬請期待!


下一篇
【知其然,更知其所以然】Day 2:第一直覺可能會用 HTTP 做 Streaming?
系列文
「知其然,更知其所以然:什麼是 Real-time (Streaming) Pipeline?從造輪子到 Flink 與 RisingWave」14
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言